package com.sisyphus.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


// 定义输入数据的样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

/**
 * 统计3：实时流量访问统计：统计一小时内的浏览总数（PV）
 * 基本需求
 * - 从埋点日志中，统计实时的PV和UV
 * - 统计每小时的访问量(PV)，并且对用户进行去重(UV)。
 * 解决思路
 * - 统计埋点日志中的PV行为，利用Set数据结构去重
 * - 对于大规模的数据，可以考虑用布隆过滤器去重
 */
object PageView {
  def main(args: Array[String]): Unit = {
    // 1. env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 2. source
    // 使用相对路径定义数据源
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile(resource.getPath)

    // 3. transformation
    val res = dataStream.map(data => {
      val dataArray = data.split(",")
      UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
    })
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.behavior == "pv") // 只统计pv操作
      .map(data => ("pv", 1))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .sum(1)

    // 4. sink
    res.print("pv count")

    // 5. execute
    env.execute("page view job")
  }
}
